Skip to content

Conversation

kingcrimsontianyu
Copy link
Contributor

@kingcrimsontianyu kingcrimsontianyu commented Sep 29, 2025

Description

For HTTP-based remote files, Polars performs additional checks involving sending a HEAD request to the server. WebHDFS does not accept HEAD requests, causing PDS-H to error out.

OSError: object-store error: Generic HTTP error: Error performing HEAD http://localhost:9870/webhdfs/v1/home/tialiu/scale-100.0/supplier.parquet in 507.238µs - Server returned non-2xx status code: 400 Bad Request:

This PR provides a workaround to enable PDS-H benchmark via WebHDFS without the need for upstream Polars change.

To enable this feature, users need to specify two new environment variables LIBCUDF_IO_REROUTE_LOCAL_DIR_PATTERN and LIBCUDF_IO_REROUTE_REMOTE_DIR_PATTERN. At runtime any local file path will be modified (only in-memory, not affecting the original file) such that the first occurrence of "local dir pattern" is replaced by "remote dir pattern", and a remote file resource will be used instead of a local file resource.

Checklist

  • I am familiar with the Contributing Guidelines.
  • New or existing tests cover these changes.
  • The documentation is up to date with these changes.

Copy link

copy-pr-bot bot commented Sep 29, 2025

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@github-actions github-actions bot added libcudf Affects libcudf (C++/CUDA) code. Python Affects Python cuDF API. cudf-polars Issues specific to cudf-polars labels Sep 29, 2025
@GPUtester GPUtester moved this to In Progress in cuDF Python Sep 29, 2025
@kingcrimsontianyu kingcrimsontianyu added improvement Improvement / enhancement to an existing function non-breaking Non-breaking change labels Sep 29, 2025
@kingcrimsontianyu
Copy link
Contributor Author

This solution works for both the regular, non-partitioned file layout and the partitioned layout. The following Python code was used to generate a dummy local directory tree that mirrors the remote directory but with a small file size for any scale factor.

Python script to generate dummy local data, whose total size is on the order of scale factor 0.01 regardless of the size of the remote files
# Copyright (c) 2025, NVIDIA CORPORATION.

# https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/WebHDFS.html

import os
import subprocess
import json
import re
from typing import Dict, List, Tuple, Any


class DummyDataGenerator:
    """
    A utility class for creating local dummy TPC-H benchmark data that mirrors remote HDFS directory structures.

    This class connects to a remote HDFS system via WebHDFS REST API, analyzes the directory structure,
    and creates a local copy with the same hierarchy but populated with smaller dummy files generated
    using tpchgen-cli with a scale factor of 0.01.

    Parameters
    ----------
    source_dir_url : str
        The WebHDFS URL of the source directory on HDFS. Must be a remote path with a valid
        RFC 3986-conformant URL scheme (e.g., 'http://hostname:port/webhdfs/v1/path').
    dest_dummy_dir : str
        The local directory path where the dummy data structure will be created.
    """

    def __init__(self, source_dir_url: str, dest_dummy_dir: str) -> None:
        """Initialize the DummyDataGenerator with source and destination paths."""
        self.source_dir_url = source_dir_url
        self.dest_dummy_dir = dest_dummy_dir

        self.is_remote = False
        # If a file path has a RFC 3986-conformant URL scheme, we will
        # consider it a remote path
        match_result = re.search(
            r"^[a-zA-Z][a-zA-Z0-9+.-]*://", self.source_dir_url)
        if match_result:
            self.is_remote = True

        if self.is_remote is False:
            raise Exception("source_dir_url must be a remote path")

    def _send_request(self, url: str, op: str) -> Dict[str, Any]:
        """
        Send an HTTP request to the WebHDFS REST API.

        Parameters
        ----------
        url : str
            The base WebHDFS URL for the resource.
        op : str
            The WebHDFS operation string (e.g., '?op=LISTSTATUS').

        Returns
        -------
        Dict[str, Any]
            The JSON response from the WebHDFS API deserialized as a Python dictionary.
        """
        full_url = f"{url}{op}"
        full_command = f"curl -s {full_url}"
        completed_proc = subprocess.run(
            full_command.split(), capture_output=True, text=True)
        assert completed_proc.stderr == ""
        # Deserialize a JSON string into a Python object
        return json.loads(completed_proc.stdout)

    def _list_dir(self, url: str) -> List[Dict[str, Any]]:
        """
        List the contents of a directory using WebHDFS LISTSTATUS operation.

        Parameters
        ----------
        url : str
            The WebHDFS URL of the directory to list.

        Returns
        -------
        List[Dict[str, Any]]
            A list of file status dictionaries, where each dictionary contains
            metadata about a file or directory (type, pathSuffix, length, etc.).

        Notes
        -----
        LISTSTATUS can be applied to both directories and files. For a file,
        the value of "pathSuffix" is empty, while for a directory, it is non-empty.
        """
        result = self._send_request(url, "?op=LISTSTATUS")
        return result["FileStatuses"]["FileStatus"]

    def _get_source_dir_info(self) -> Tuple[Dict[str, List[str]], bool]:
        """
        Analyze the source directory structure on HDFS to determine layout and file organization.

        Returns
        -------
        Tuple[Dict[str, List[str]], bool]
            A tuple containing:
            - source_file_info: Dictionary mapping table names to lists of parquet file names
            - is_partitioned: Boolean indicating whether the data uses partitioned layout

        Notes
        -----
        This method detects two possible TPC-H data layouts:

        1. Regular (non-partitioned) layout::

            source_dir_url/
            ├── customer.parquet
            ├── lineitem.parquet
            └── ...

        2. Partitioned layout::

            source_dir_url/
            ├── customer/
            │   ├── part-1.parquet
            │   └── part-2.parquet
            ├── lineitem/
            │   ├── part-1.parquet
            │   └── part-2.parquet
            └── ...

        The method prints the number of files found for each table during discovery.
        """

        is_partitioned = True
        file_info_list = self._list_dir(self.source_dir_url)
        source_file_info = {}
        for file_info in file_info_list:
            if file_info["type"] == "FILE":
                is_partitioned = False
                break

            # table_name is customer, lineitem, etc
            table_name = file_info["pathSuffix"]
            source_file_info[table_name] = []

            source_subdir_url = f"{self.source_dir_url}/{file_info["pathSuffix"]}"

            result_list = self._list_dir(source_subdir_url)
            for result in result_list:
                # Each directory contains a file "_SUCCESS" which is not
                # needed.
                if result["pathSuffix"].endswith(".parquet"):
                    source_file_info[table_name].append(result["pathSuffix"])

        for table_name, file_name_list in source_file_info.items():
            num_files = len(file_name_list)
            print(f"--> {table_name} has {num_files} files")

        return (source_file_info, is_partitioned)

    def _create_local_dummy_data(self, source_file_info: Dict[str, List[str]], is_partitioned: bool) -> None:
        """
        Create local dummy data based on the detected layout type.

        Parameters
        ----------
        source_file_info : Dict[str, List[str]]
            Dictionary mapping table names to lists of parquet file names.
        is_partitioned : bool
            Whether to create partitioned or regular layout.
        """
        if is_partitioned:
            self._dummy_for_partitioned_layout(source_file_info)
        else:
            self._dummy_for_regular_layout()

    def _dummy_for_regular_layout(self) -> None:
        """
        Create dummy data files for regular (non-partitioned) TPC-H layout.

        This method generates a simple flat directory structure with one parquet file
        per TPC-H table using tpchgen-cli with scale factor 0.01.
        """
        if os.path.exists(self.dest_dummy_dir) is False:
            os.makedirs(self.dest_dummy_dir)

        full_command = f"tpchgen-cli --output-dir {self.dest_dummy_dir} --format=parquet --scale-factor=0.01"
        completed_proc = subprocess.run(
            full_command.split(), capture_output=True, text=True)
        assert completed_proc.stderr == ""

    def _dummy_for_partitioned_layout(self, source_file_info: Dict[str, List[str]]) -> None:
        """
        Create dummy data files for partitioned TPC-H layout.

        This method creates a directory hierarchy where each table has its own subdirectory
        containing multiple partitioned parquet files. File names are matched exactly to
        the source structure.

        Parameters
        ----------
        source_file_info : Dict[str, List[str]]
            Dictionary mapping table names to lists of parquet file names that should
            be created in each table's subdirectory.

        Notes
        -----
        For each partition, this method:
        1. Generates a small parquet file using tpchgen-cli with the appropriate part number
        2. Renames the generated file to match the source file name exactly
        """
        if os.path.exists(self.dest_dummy_dir) is False:
            os.makedirs(self.dest_dummy_dir)

        for table_name, file_name_list in source_file_info.items():
            # Create directories
            table_dir = os.path.join(self.dest_dummy_dir, table_name)
            if os.path.exists(table_dir) is False:
                os.mkdir(table_dir)

            # Create small files
            num_partitions = len(file_name_list)
            for part_idx in range(num_partitions):
                full_command = f"tpchgen-cli --output-dir {table_dir} --format=parquet --scale-factor=0.01 "
                full_command += f"--tables {table_name} --parts {num_partitions} --part {part_idx+1}"
                completed_proc = subprocess.run(
                    full_command.split(), capture_output=True, text=True)
                assert completed_proc.stderr == ""

                # Change file name
                current_file_name = os.path.join(
                    table_dir, f"{table_name}.parquet")
                new_file_name = os.path.join(
                    table_dir, file_name_list[part_idx])
                print(new_file_name)
                os.rename(current_file_name, new_file_name)

    def run(self) -> None:
        source_file_info, is_partitioned = self._get_source_dir_info()
        self._create_local_dummy_data(source_file_info, is_partitioned)


if __name__ == "__main__":
    # Example 1: Partitioned layout
    # source_dir_url = "http://<server>:9870/webhdfs/v1/home/tialiu/partitioned/scale-10.0"
    # dest_dummy_dir = "partitioned/scale-100.0"

    # Example 2: Regular (non-partitioned) layout
    source_dir_url = "http://<server>:9870/webhdfs/v1/home/tialiu/scale-10.0"
    dest_dummy_dir = "regular/scale-100.0"

    ddg = DummyDataGenerator(source_dir_url, dest_dummy_dir)
    ddg.run()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

cudf-polars Issues specific to cudf-polars improvement Improvement / enhancement to an existing function libcudf Affects libcudf (C++/CUDA) code. non-breaking Non-breaking change Python Affects Python cuDF API.

Projects

Status: In Progress

Development

Successfully merging this pull request may close these issues.

1 participant